package com.k2data.platform.backup.consumer;

import com.k2data.common.EnvConf;
import com.k2data.platform.backup.common.Constants;
import com.k2data.platform.backup.common.InitConfigForBackup;
import com.k2data.platform.backup.common.ParamNames;
import com.k2data.platform.backup.rotation.CountRotationPolicy;
import com.k2data.platform.backup.rotation.CountSyncPolicy;
import com.k2data.platform.backup.rotation.SyncPolicy;
import com.k2de.consumer.impl.EmptyMessageToRecordConverterImpl;
import com.k2de.consumer.impl.KafkaConsumer;
import com.k2de.consumer.inter.MessageToRecordConverterInter;
import com.k2de.consumer.result.KmxRecordResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by ChenJingShuai on 2018/1/18.
 */
public class KafkaConsumerToHDFS extends KafkaConsumer {
    private final static Logger LOG = LoggerFactory.getLogger(KafkaConsumerToHDFS.class);

    private static final String HDFS_SEPARATOR = "/";
    private static final String KEY_SEPARATOR = "_";
    private static final String SUFFIX_PROCESSING = ".processing";
    private static final String SUFFIX_COMPLETE = ".complete";

    private static ExecutorService executor;

    private final String CODEC;
    private final Configuration conf = new Configuration();
    private final String targetHdfsPath;

    private int threadNum;
    private SyncPolicy syncPolicy;
    private CountRotationPolicy rotationPolicy;

    public KafkaConsumerToHDFS(String zkUrl, String hdfs_url, String consumedKafkaKeyPrefix, String groupName, String codec, int threadNum,
                               long flushThreshold, long rotationThreshold) {
        super(zkUrl, consumedKafkaKeyPrefix, groupName, "smallest", false);
        CODEC = codec;
        this.threadNum = threadNum;
        this.syncPolicy = new CountSyncPolicy(flushThreshold);
        this.rotationPolicy = new CountRotationPolicy(rotationThreshold);

        targetHdfsPath = EnvConf.getEvnValue(ParamNames.HDFS_PATH, Constants.HDFS_PATH) + HDFS_SEPARATOR +
                consumedKafkaKeyPrefix.replace(KEY_SEPARATOR, HDFS_SEPARATOR);

        // init hdfsConfig.
        String[] hdfsUrls = hdfs_url.split(",");
        conf.set("fs.defaultFS", "hdfs://nameservice1");
        conf.set("fs.default.name", conf.get("fs.defaultFS"));
        conf.set("dfs.nameservices", "nameservice1");
        conf.set("dfs.ha.namenodes.nameservice1", "namenode1,namenode2");
        conf.set("dfs.namenode.rpc-address.nameservice1.namenode1", hdfsUrls[0]);
        conf.set("dfs.namenode.rpc-address.nameservice1.namenode2", hdfsUrls[1]);
        conf.set("dfs.client.failover.proxy.provider.nameservice1",
                "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
        );
        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", LocalFileSystem.class.getName());
    }

    public static void shutdown() {
        if (executor != null) {
            executor.shutdown();
        } else {
            return;
        }
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted during shutdown, exiting uncleanly");
        }
    }

    private static void checkConfig(String name, String value) {
        if (value == null || value.equals("")) {
            LOG.error("System environment [" + name + "] is missing");
            System.exit(-1);
        } else {
            LOG.info("System environment [" + name + "] is [" + value + "]");
        }
    }

    // 	0-1497954544729.bzip2
    private String getFileName(String codec) {
        if (!codec.equals("nocomp")) {
            return this.threadNum + "-" + System.currentTimeMillis() + "." + codec;
        } else {
            return this.threadNum + "-" + System.currentTimeMillis();
        }
    }

    private OutputStream warpOutPutStreamByCodec(OutputStream fout, String compType, Configuration conf) throws IOException {
        switch (compType) {
            case "nocomp":
                return fout;
            case "gzip": {
                GzipCodec codec = new GzipCodec();
                codec.setConf(conf);
                return codec.createOutputStream(fout);
            }
            case "bzip2": {
                BZip2Codec codec = new BZip2Codec();
                codec.setConf(conf);
                return codec.createOutputStream(fout);
            }
            default:
                LOG.warn("CompType not recognized");
                return fout;
        }
    }

    /**
     * 下面几个变量被{@link #sendKmxRecordResult(KmxRecordResult)}使用。
     */
    private FileSystem fileSystem = null;
    private OutputStream outputStream = null;
    private String targetFile = null;
    private Path processingPath = null;
    private boolean isCreateNewFile = true;

    @Override
    public void sendKmxRecordResult(KmxRecordResult kmxRecordResult) {
        String originalMessage = kmxRecordResult.getOriginalMessage();
        boolean isExceptionOccurs = false;
        try {
            if (fileSystem == null) {
                fileSystem = FileSystem.get(conf);
            }

            if (isCreateNewFile) {
                targetFile = targetHdfsPath + HDFS_SEPARATOR + getFileName(CODEC);
                processingPath = new Path(targetFile + SUFFIX_PROCESSING);

                LOG.debug("File named {} is created.", processingPath);

                outputStream = fileSystem.create(processingPath);
                outputStream = warpOutPutStreamByCodec(outputStream, CODEC, conf);
                isCreateNewFile = false;
            }

            outputStream.write(originalMessage.getBytes());
            outputStream.write("\n".getBytes());

            if (this.syncPolicy.mark(originalMessage)) {
                outputStream.flush();
                LOG.debug("Thread{}: Flush {} message(s) to hdfs({}).", threadNum, syncPolicy.getThreshold(),
                        processingPath);
                this.syncPolicy.reset();
            }


            if (this.rotationPolicy.mark(originalMessage)) {
                outputStream.close();
                Path completePath = new Path(targetFile + SUFFIX_COMPLETE);
                fileSystem.rename(processingPath, completePath);  // tag the completion
                commitOffsets();

                LOG.debug("Thread{}: Rename {} to {} and the new renamed file has {} messages.", threadNum,
                        processingPath, completePath, rotationPolicy.getThreshold());

                if (Constants.STOP_SERVICE_FILE.exists()) {
                    Thread.sleep(1000);
                    LOG.info("Waiting for consumer stop because {} dir is created.", Constants.STOP_SERVICE_FILE);

                }

                this.rotationPolicy.reset();
                isCreateNewFile = true;
            }
        } catch (IOException | InterruptedException e) {
            LOG.warn("Occurs exception:{}.", e.getMessage());
            isExceptionOccurs = true;
        } finally {
            // Close fileSystem and outputStream.
            if (isExceptionOccurs) {
                try {
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    if (fileSystem != null) {
                        fileSystem.close();
                    }
                } catch (IOException e) {
                    LOG.warn("Closing stream occurs exception:{}.", e.getMessage());
                }
            }
        }

    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("Usage: " + KafkaConsumerToHDFS.class.getSimpleName() + " <kafkaKeyTag>");
            System.exit(-1);
        }
        String consumedKafkaKeyPrefix = args[0];

        // check required configuration
        String zkUrl = EnvConf.getEvnValue(ParamNames.ZOOKEEPER_URL, "");
        checkConfig(ParamNames.ZOOKEEPER_URL, zkUrl);
        final String topic = EnvConf.getEvnValue(ParamNames.KAFKA_TOPIC);
        checkConfig(ParamNames.KAFKA_TOPIC, topic);
        String brokers = EnvConf.getEvnValue(ParamNames.KAFKA_BROKERS);
        checkConfig(ParamNames.KAFKA_BROKERS, brokers);
        String hdfsUrl = InitConfigForBackup.HDFS_URL;
        checkConfig(ParamNames.HDFS_URL, hdfsUrl);
        String groupName = EnvConf.getEvnValue(ParamNames.GROUP_NAME);
        checkConfig(ParamNames.GROUP_NAME, groupName);

        // 初始化删除停止文件
        if (Constants.STOP_SERVICE_FILE.exists()) {
            LOG.info("init delete stop file.");
            Constants.STOP_SERVICE_FILE.delete();
        }

        // 获取topic的数量.
        MetaDataConsumer metaDataConsumer = new MetaDataConsumer(Arrays.asList(brokers.split(",")));
        int threadCount = metaDataConsumer.getPartitionNum(topic);
        LOG.info("threads number is " + threadCount);

        // run this class in thread pool.
        executor = Executors.newFixedThreadPool(threadCount);
        MessageToRecordConverterInter emptyMessageToRecordConverter = new EmptyMessageToRecordConverterImpl();
        for (int i = 0; i < threadCount; i++) {
            final KafkaConsumerToHDFS kafkaConsumerToHDFS = new KafkaConsumerToHDFS(zkUrl, hdfsUrl,
                    consumedKafkaKeyPrefix, groupName, InitConfigForBackup.CODEC, i,
                    InitConfigForBackup.HDFS_SYNC_COUNT, InitConfigForBackup.HDFS_ROTATION_COUNT);
            kafkaConsumerToHDFS.setMessageToRecordConverter(emptyMessageToRecordConverter);

            Runnable consumerTask = new Runnable() {
                @Override
                public void run() {
                    kafkaConsumerToHDFS.consume(topic);
                }
            };
            executor.submit(consumerTask);
        }

        // add shutdown hook for exiting gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                LOG.info("Exit gracefully...");
                shutdown();
            }
        });
    }
}
